package com.itmuch.cloud.study.user.controller;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;

import com.itmuch.cloud.study.user.entity.User;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;

@Slf4j
@Api(tags = "负载均衡节点")
@RestController
@RequestMapping("/node")
public class NodeController
{
    @Autowired
    private WebClient webClient;
    
    @Value("${microservice-ribbon-user.ribbon.listOfServers}")
    private List<String> listOfServers;
    
    private ExecutorService executorService = Executors.newFixedThreadPool(10);
    
    @ApiOperation("查询用户")
    @GetMapping("/user/{id}")
    public List<User> findById(@PathVariable Long id)
        throws InterruptedException
    {
        // WebClient支持异步
        List<User> users = new CopyOnWriteArrayList<User>();
        listOfServers.stream()
            .forEach(hostWithPort -> webClient.get()
                .uri(String.format("http://%s/%s", hostWithPort, id))// URI
                .acceptCharset(StandardCharsets.UTF_8)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(User.class)
                .subscribe(resp -> users.add(resp)));
        
        int index = 0;
        while (users.isEmpty() && (index++) < 100)
        {
            TimeUnit.MILLISECONDS.sleep(10);
            log.info("index:{}, waitting......", index);
        }
        if (users.isEmpty())
        {
            throw new RuntimeException("查询超时，无返回值");
        }
        return users;
    }
    
    @ApiOperation("查询用户 by execute")
    @GetMapping("/v0/user/{id}")
    public List<User> findByExecute(@PathVariable Long id)
        throws InterruptedException
    {
        // List<User> users = new ArrayList<User>();
        // TODO ArrayList users一定概率有null值
        // 原因:通过new ArrayList<>()初始化的大小是0，首次插入触发扩容，并发可能导致出现null值
        
        List<User> users = new CopyOnWriteArrayList<User>();
        listOfServers.stream()
            .forEach(hostWithPort -> executorService.execute(() -> webClient.get()
                .uri(String.format("http://%s/%s", hostWithPort, id))// URI
                .acceptCharset(StandardCharsets.UTF_8)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(User.class)
                .subscribe(resp -> users.add(resp))));
        
        int index = 0;
        while (users.isEmpty() && (index++) < 100)
        {
            TimeUnit.MILLISECONDS.sleep(10);
            log.info("index:{}, waitting......", index);
        }
        if (users.isEmpty())
        {
            throw new RuntimeException("查询超时，无返回值");
        }
        return users;
    }
    
    @ApiOperation("查询用户 by submit")
    @GetMapping("/v1/user/{id}")
    public List<User> findBySubmit(@PathVariable Long id)
        throws InterruptedException
    {
        List<User> users = new CopyOnWriteArrayList<User>();
        listOfServers.stream()
            .forEach(hostWithPort -> executorService.submit(() -> webClient.get()
                .uri(String.format("http://%s/%s", hostWithPort, id))// URI
                .acceptCharset(StandardCharsets.UTF_8)
                .accept(MediaType.APPLICATION_JSON)
                .retrieve()
                .bodyToMono(User.class)
                .subscribe(resp -> users.add(resp)), users));
        
        int index = 0;
        while (users.isEmpty() && (index++) < 100)
        {
            TimeUnit.MILLISECONDS.sleep(10);
            log.info("index:{}, waitting......", index);
        }
        if (users.isEmpty())
        {
            throw new RuntimeException("查询超时，无返回值");
        }
        return users;
    }
    
    @ApiOperation("查询用户 by invokeAny")
    @GetMapping("/v2/user/{id}")
    public User findByInvokeAny(@PathVariable Long id)
        throws InterruptedException, ExecutionException, TimeoutException
    {
        return executorService.invokeAny(listOfServers.stream().map(hostWithPort -> new Callable<User>()
        {
            @Override
            public User call()
            {
                Mono<User> mono = webClient.get()
                    .uri(String.format("http://%s/%s", hostWithPort, id))// URI
                    .acceptCharset(StandardCharsets.UTF_8)
                    .accept(MediaType.APPLICATION_JSON)
                    .retrieve()
                    .bodyToMono(User.class);
                return mono.block();
            }
        }).collect(Collectors.toList()), 1000, TimeUnit.MILLISECONDS);
    }
    
    @ApiOperation("查询用户 by invokeAll")
    @GetMapping("/v3/user/{id}")
    public List<User> findByInvokeAll(@PathVariable Long id)
        throws InterruptedException
    {
        List<Future<User>> futures = executorService.invokeAll(listOfServers.stream().map(hostWithPort -> new Callable<User>()
        {
            @Override
            public User call()
            {
                Mono<User> mono = webClient.get()
                    .uri(String.format("http://%s/%s", hostWithPort, id))// URI
                    .acceptCharset(StandardCharsets.UTF_8)
                    .accept(MediaType.APPLICATION_JSON)
                    .retrieve()
                    .bodyToMono(User.class);
                return mono.block();
            }
        }).collect(Collectors.toList()), 1000, TimeUnit.MILLISECONDS);
        
        List<User> users = new ArrayList<User>();
        for (Future<User> future : futures)
        {
            try
            {
                users.add(future.get());
            }
            catch (Exception e)
            {
                log.error(e.getMessage(), e);
            }
        }
        return users;
    }
}
